Kafka Consumer API

1
public class KafkaConsumer<K,V> extends java.lang.Object implements Consumer<K,V>

用于消费记录。

显式处理broker失效、适应分区在集群内转移,允许通过消费组负载均衡。

为了获取数据,维护与broker的连接。关闭失败将导致连接泄漏。

不是线程安全的,多线程使用详见Multi-threaded Processing

1 跨版本兼容

能够与版本>=0.10.0的broker通信,但不支持某些特性(抛出异常UnsupportedVersionException)。

2 偏移和消费位置

offset唯一标识记录在每个分区中的位置。

两个重要的相关概念:

  • 消费位置:offset指示了即将消费的下一位置。每当消费者接受记录(调用poll(Duration))后自动后移。
  • 提交位置:最近被安全存储的偏移位置。如果进程失效或重启,消费者将从该偏移恢复。消费者可以周期性提交偏移,或者人工调用API设置(commitSynccommitAsync

者两种位置是消费者可以控制消费完成的时机。

3 消费组和主题订阅

Kafka使用消费组实现伸缩性和容错。消费进程可以在单一或集群机器上运行。拥有相同group.id的消费者视为同一组。

每个消费者可以通过Subscribe API动态设置订阅的主题。主题的分区在同一消费组中的所有订阅该主题的消费者中负载均衡。

某个消费者失效后,原先分配给它的分区将重新分配给同一组中的其他消费者。同理,添加新的消费者时,将其他消费者上的分区均分给它。这个过程叫rebalancing。rebalancing同样用于主题的变化。当主题增加新的分区,或者创建了匹配subscribed regex 的主题,消费组将自动周期性刷新,并分配新的分区给成员。

消费组可以视为具有多个消费进程的单一逻辑订阅者。由于消费者相当廉价,Kafka支持在没有数据冗余的情况下任意多个消费组。

消费者可以通过 ConsumerRebalanceListener做一些应用级别的逻辑,如状态清理、人为提交偏移等。详见Storing Offsets Outside Kafka

4 检测消费者失效

当消费者调用poll(Duration)后,消费者将自动加入消费组。poll API用于确保消费者存活。只要继续调用,消费者就会被包含在组内。底层逻辑是消费者定期发送心跳包给服务器,当接收时长超过session.timeout.ms,消费者将被判定为失效,其拥有的分区将被分发。

消费者有时会遇到“livelock”:只发送心跳包,但不作为。Kafka通过限制poll调用的时间间隔max.poll.interval.ms来避免。超过最大间隔后,调用commitSync()时抛出异常CommitFailedException。这个机制确保了只有活跃的组成员才能提交偏移。因此只有不断调用poll才能留在组内。

消费者使用以下两种配置控制poll循环行为:

  • max.poll.interval.ms

    由于rebalance是在poll中使用,调整检测时间将同样调整rebalance时间,可能延缓对消费者失效的处理。

  • max.poll.records

    限制单次调用poll返回的记录数量。可以间接地调整poll时间间隔。

由于消息处理的时间差异难以预测,两种方式都有效。

为了让消费者在消息处理的同时调用poll,建议将消息处理移动到另一个线程中。需要确保提交的偏移不能在实际位置之前。通常需要关闭自动提交,改为手动提交。只有确认处理线程处理完成后才能提交。在处理线程返回之前,需要pause分区,以使从poll没有新的记录被接收。

5 用例

(1) 自动偏移提交

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
Properties props = new Properties();
// 指定broker列表
props.put("bootstrap.servers", "localhost:9092");
// 分配消费组id
props.put("group.id", "test");
// 启用自动提交
props.put("enable.auto.commit", "true");
// 设置自动提交时间间隔
props.put("auto.commit.interval.ms", "1000");
// 设置反序列化类
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
// 订阅主题
consumer.subscribe(Arrays.asList("foo", "bar"));
while (true) {
ConsumerRecords<String, String> records = consumer.poll(100);
for (ConsumerRecord<String, String> record : records)
System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value());
}

(2) 人工偏移提交

允许人工确认处理状态,用于具有复杂处理逻辑的场景,实现”at least once”语义

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("group.id", "test");
// 关闭自动提交
props.put("enable.auto.commit", "false");
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
consumer.subscribe(Arrays.asList("foo", "bar"));
// 最小批处理数据量
final int minBatchSize = 200;
List<ConsumerRecord<String, String>> buffer = new ArrayList<>();
while (true) {
ConsumerRecords<String, String> records = consumer.poll(100);
for (ConsumerRecord<String, String> record : records) {
buffer.add(record);
}
if (buffer.size() >= minBatchSize) {
// 复杂处理逻辑
insertIntoDb(buffer);
// 人工提交全部成功
consumer.commitSync();
buffer.clear();
}
}

注意:自动提交也可以实现“at least once”语义。在下一次调用poll()或关闭消费者时才认为上一次处理完成。

人工提交也可以逐条记录提交,实现更细粒度的提交。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
// 逐个分区处理记录
try {
while(running) {
ConsumerRecords<String, String> records = consumer.poll(Long.MAX_VALUE);
for (TopicPartition partition : records.partitions()) {
List<ConsumerRecord<String, String>> partitionRecords = records.records(partition);
for (ConsumerRecord<String, String> record : partitionRecords) {
System.out.println(record.offset() + ": " + record.value());
}
// 提交的分区应该是下一个消费位置
long lastOffset = partitionRecords.get(partitionRecords.size() - 1).offset();
consumer.commitSync(Collections.singletonMap(partition, new OffsetAndMetadata(lastOffset + 1)));
}
}
} finally {
consumer.close();
}

(3) 人工分区分配

当处理过程涉及本地信息,或者处理过程本身就是高可用的,不需要Kafka再提供时。

1
2
3
4
5
6
String topic = "foo";
// 为消费者执行消费的主题+分区
TopicPartition partition0 = new TopicPartition(topic, 0);
TopicPartition partition1 = new TopicPartition(topic, 1);
// 订阅方法为assign()
consumer.assign(Arrays.asList(partition0, partition1));

注意:

group.id仍可用于偏移提交

只能通过assign()重新指定分区

为了避免偏移提交冲突,确保消费者间组ID唯一

不能在主题订阅时混淆动态分配和人工分配

(4) 外部存储偏移

http://kafka.apache.org/20/javadoc/org/apache/kafka/clients/consumer/KafkaConsumer.html

参考资料

Class KafkaConsumer

elodina/scala-kafka